在 Apache Spark 中按列分区到 S3
Partitioning by column in Apache Spark to S3
有我们想要从具有 JSON 的 S3 读取文件的用例。然后,基于特定的 JSON 节点值,我们希望对数据进行分组并将其写入 S3。
我能够读取数据,但找不到关于如何根据 JSON 键对数据进行分区然后上传到 S3 的好例子。任何人都可以提供任何示例或指向可以帮助我处理此用例的教程吗?
创建数据框后我得到了我的数据模式:
root
|-- customer: struct (nullable = true)
| |-- customerId: string (nullable = true)
|-- experiment: string (nullable = true)
|-- expiryTime: long (nullable = true)
|-- partitionKey: string (nullable = true)
|-- programId: string (nullable = true)
|-- score: double (nullable = true)
|-- startTime: long (nullable = true)
|-- targetSets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- featured: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- data: struct (nullable = true)
| | | | | |-- asinId: string (nullable = true)
| | | | |-- pk: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- reason: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- recommended: array (nullable = true)
| | | |-- element: string (containsNull = true)
我想根据 customerId 列上的随机散列对数据进行分区。但是当我这样做时:
df.write.partitionBy("customerId").save("s3/bucket/location/to/save");
它给出错误:
org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));
请告诉我我可以访问 customerId 列。
让我们以数据集为例 sample.json
{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"}
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"}
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"}
现在开始用 Spark 破解它
val jsonDf = spark.read
.format("json")
.load("path/of/sample.json")
jsonDf.show()
+---------+-------+-----+-----+
| CITY|CUST_ID|STATE| ZIP|
+---------+-------+-----+-----+
| San Jose| 115734| CA|95106|
|Allentown| 115728| PA|18101|
|Allentown| 115730| PA|18101|
|San Mateo| 114728| CA|94401|
| Somerset| 114726| NJ| 8873|
+---------+-------+-----+-----+
然后按列"ZIP"
划分数据集并写入S3
jsonDf.write
.partitionBy("ZIP")
.save("s3/bucket/location/to/save")
// one liner athentication to s3
//.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save")
Note: In order this code successfully S3 access and secret key has to
be configured properly. Check this answer for Spark/Hadoop
integration with S3
编辑:解决方案:在架构中找不到分区列 customerId(根据评论)
customerId
存在于 customer
结构中,所以尝试提取 customerId
然后进行分区。
df.withColumn("customerId", $"customer.customerId")
.drop("customer")
.write.partitionBy("customerId")
.save("s3/bucket/location/to/save")
有我们想要从具有 JSON 的 S3 读取文件的用例。然后,基于特定的 JSON 节点值,我们希望对数据进行分组并将其写入 S3。
我能够读取数据,但找不到关于如何根据 JSON 键对数据进行分区然后上传到 S3 的好例子。任何人都可以提供任何示例或指向可以帮助我处理此用例的教程吗?
创建数据框后我得到了我的数据模式:
root
|-- customer: struct (nullable = true)
| |-- customerId: string (nullable = true)
|-- experiment: string (nullable = true)
|-- expiryTime: long (nullable = true)
|-- partitionKey: string (nullable = true)
|-- programId: string (nullable = true)
|-- score: double (nullable = true)
|-- startTime: long (nullable = true)
|-- targetSets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- featured: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- data: struct (nullable = true)
| | | | | |-- asinId: string (nullable = true)
| | | | |-- pk: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- reason: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- recommended: array (nullable = true)
| | | |-- element: string (containsNull = true)
我想根据 customerId 列上的随机散列对数据进行分区。但是当我这样做时:
df.write.partitionBy("customerId").save("s3/bucket/location/to/save");
它给出错误:
org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));
请告诉我我可以访问 customerId 列。
让我们以数据集为例 sample.json
{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"}
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"}
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"}
现在开始用 Spark 破解它
val jsonDf = spark.read
.format("json")
.load("path/of/sample.json")
jsonDf.show()
+---------+-------+-----+-----+
| CITY|CUST_ID|STATE| ZIP|
+---------+-------+-----+-----+
| San Jose| 115734| CA|95106|
|Allentown| 115728| PA|18101|
|Allentown| 115730| PA|18101|
|San Mateo| 114728| CA|94401|
| Somerset| 114726| NJ| 8873|
+---------+-------+-----+-----+
然后按列"ZIP"
划分数据集并写入S3
jsonDf.write
.partitionBy("ZIP")
.save("s3/bucket/location/to/save")
// one liner athentication to s3
//.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save")
Note: In order this code successfully S3 access and secret key has to be configured properly. Check this answer for Spark/Hadoop integration with S3
编辑:解决方案:在架构中找不到分区列 customerId(根据评论)
customerId
存在于 customer
结构中,所以尝试提取 customerId
然后进行分区。
df.withColumn("customerId", $"customer.customerId")
.drop("customer")
.write.partitionBy("customerId")
.save("s3/bucket/location/to/save")